use std::{
    collections::{HashMap, HashSet},
    sync::Arc,
    time::Duration,
};

use crate::service::sys_service_api::xport_service;
use crate::{
    application::Application,
    protocol::{protocol_hearbeat, protocol_v1::make_ack_message},
};
use tokio::{sync::RwLock, time::sleep};
use tracing::{debug, info};

use super::channel::Channel;

pub struct ChannelManager {
    // /**
    //  * 所有通道的 连接监听者
    //  */
    // conn_listener_map: RwLock<HashMap<i64, fn(i64, bool)>>,

    // /**
    //  * 直连通道的信息
    //  */
    // direct_channel_map: RwLock<HashMap<i64, Arc<Box<dyn Channel>>>>,

    // 存所有通道的信息的
    // 根据节点来分类通道的信息
    // 通道ID来区分的
    // key = channel_id
    channel_id_map: RwLock<HashMap<i64, Arc<Box<dyn Channel>>>>,

    /**
     * key1= conn_id key2 = channel_id
     */
    channel_conn_id_to_id_map: RwLock<HashMap<i64, HashSet<i64>>>,
}

impl ChannelManager {
    pub fn new() -> ChannelManager {
        ChannelManager {
            // conn_listener_map: RwLock::new(HashMap::new()),
            // direct_channel_map: RwLock::new(HashMap::new()),
            channel_id_map: RwLock::new(HashMap::new()),
            channel_conn_id_to_id_map: RwLock::new(HashMap::new()),
        }
    }

    pub async fn get_channel(
        self: &Arc<Box<ChannelManager>>,
        channel_id: i64,
    ) -> Option<Arc<Box<dyn Channel>>> {
        let channel_id_map = self.channel_id_map.read().await;

        let channel = channel_id_map.get(&channel_id);

        if channel.is_none() {
            return None;
        }

        let channel = channel.unwrap();

        Some(channel.clone())
    }

    pub async fn get_channel_by_conn_id(
        self: &Arc<Box<ChannelManager>>,
        conn_id: i64,
    ) -> Option<i64> {
        let channel_conn_id_to_id_map = self.channel_conn_id_to_id_map.read().await;

        let channel_id_set = channel_conn_id_to_id_map.get(&conn_id);

        if channel_id_set.is_none() {
            return None;
        }
        let channel_id_set = channel_id_set.unwrap();

        if let Some(channel_id) = channel_id_set.iter().next() {
            Some(channel_id.clone())
        } else {
            None
        }
    }

    pub async fn get_conn_id_by_channel_id(
        self: &Arc<Box<ChannelManager>>,
        channel_id: i64,
    ) -> Option<i64> {
        let channel = self.get_channel(channel_id).await;
        if channel.is_none() {
            return None;
        }

        return Some(channel.unwrap().conn_id());
    }

    /**
     *
     */
    async fn remove_channel_id_conn_id(
        self: &Arc<Box<ChannelManager>>,
        conn_id: i64,
        channel_id: i64,
    ) {
        //
        let mut channel_conn_id_to_id_map = self.channel_conn_id_to_id_map.write().await;
        let channel_id_set = channel_conn_id_to_id_map.get_mut(&conn_id);
        if channel_id_set.is_none() {
            return;
        }

        channel_id_set.unwrap().remove(&channel_id);
    }

    pub async fn add_conn_id_to_channel_map(
        self: &Arc<Box<ChannelManager>>,
        conn_id: i64,
        channel_id: i64,
    ) {
        let mut channel_conn_id_to_id_map = self.channel_conn_id_to_id_map.write().await;

        let channel_id_set = channel_conn_id_to_id_map.get_mut(&conn_id);

        if channel_id_set.is_none() {
            return;
        }

        channel_id_set.unwrap().insert(channel_id);
    }

    pub async fn remove_channel(self: &Arc<Box<ChannelManager>>, channel_id: i64) {
        let channel = {
            let mut channel_id_map = self.channel_id_map.write().await;

            channel_id_map.remove(&channel_id)
        };

        if channel.is_none() {
            return;
        }
        let channel = channel.unwrap();
        // 移除代码
        self.remove_channel_id_conn_id(channel.conn_id(), channel_id)
            .await;

        //
        let airport = Application::get_airport();

        airport
            .get_dxc_manager()
            .remove_dxc_channel_id(channel_id)
            .await;

        channel.on_close().await;
        // 通知关闭
        xport_service::channel_disconnected(airport, channel_id, channel.is_active()).await;
        //
    }

    /**
     *检查 channel 是否注册
     */
    async fn check_channel_register(self: &Arc<Box<ChannelManager>>, channel_id: i64) {
        let clone_self = self.clone();

        tokio::spawn(async move {
            sleep(Duration::from_millis(5000)).await;

            let channel = clone_self.get_channel(channel_id).await;
            if channel.is_none() {
                return;
            }

            let channel = channel.unwrap();

            let is_set_port = channel.had_set_port();

            if is_set_port {
                info!(
                    "check_channel_register channel:{} 已经设置端口号 ...",
                    channel_id
                );
                return;
            }
            info!("channel:{} 未设置端口号，现在做关闭处理...", channel_id);
            clone_self.remove_channel(channel_id).await;
        });
    }
    /**
     * 保持心跳
     */
    async fn keep_channel_alive(self: &Arc<Box<ChannelManager>>, channel_id: i64) {
        let clone_self = self.clone();
        tokio::spawn(async move {
            loop {
                sleep(Duration::from_millis(30000)).await;

                let channel = clone_self.get_channel(channel_id).await;

                if channel.is_none() {
                    return;
                }
                let channel = channel.unwrap();

                let heartbeat_message = protocol_hearbeat::make_hearbeat_message();

                debug!("发送心跳 通道id = {}...", channel_id);
                channel.send_message(&heartbeat_message).await;
            }
        });
    }
    /**
     *
     */
    async fn cycle_send_recv_msg_ack(self: &Arc<Box<ChannelManager>>, channel_id: i64) {
        let clone_self = self.clone();
        tokio::spawn(async move {
            loop {
                sleep(Duration::from_millis(50)).await;

                let channel = clone_self.get_channel(channel_id).await;
                if channel.is_none() {
                    continue;
                }
                let channel = channel.unwrap();
                let ack_ids = channel.get_all_wait_for_send_msg_and_clear();
                if ack_ids.is_empty() {
                    continue;
                }
                //
                let message = make_ack_message(ack_ids);
                channel.send_message(&message).await;
            }
        });
    }
    //
    pub async fn add_channel(
        self: &Arc<Box<ChannelManager>>,
        channel: Arc<Box<dyn Channel>>,
        is_accept: bool,
    ) {
        //
        let airport = Application::get_airport();
        //
        let channel_id = channel.id();
        //
        {
            let mut channel_info_map = self.channel_id_map.write().await;
            if channel_info_map.contains_key(&channel_id) {
                info!("添加通道id = {}失败, 已经添加过...", channel_id);
                return;
            }
            channel_info_map.insert(channel_id, channel);
        }

        if is_accept {
            //
            self.check_channel_register(channel_id).await;
        } else {
            self.keep_channel_alive(channel_id).await;
        }

        self.cycle_send_recv_msg_ack(channel_id).await;
        // 非 accept 表明是主动连接，需要主动通知表明身份

        if !is_accept {
            let ip = airport
                .xprc_net
                .get_ip_by_session_id(channel_id)
                .await
                .unwrap();
            xport_service::channel_connected(airport, channel_id, ip, false).await;
        }
    }

    /**
     * 返回 channel dx
     */
    pub async fn build_xprc_channel(&self, conn_id: i64) -> Option<i64> {
        let airport = Application::get_airport();
        //
        {
            let channel_conn_id_to_id_map = self.channel_conn_id_to_id_map.write().await;
            // 判断对应节点是否已经有连接，若有取出任意一个即可
            let channel_set = channel_conn_id_to_id_map.get(&conn_id);
            if let Some(channel_set) = channel_set {
                if channel_set.len() != 0 {
                    return Some(*channel_set.iter().next().unwrap());
                }
            }
        }
        // 这里会导致排队
        let channel_id = airport.build_xprc_channel(conn_id).await;
        if channel_id.is_none() {
            return None;
        }

        // 重新创建进阿里
        {
            let mut channel_conn_id_to_id_map = self.channel_conn_id_to_id_map.write().await;
            // 判断对应节点是否已经有连接，若有取出任意一个即可
            let channel_set = channel_conn_id_to_id_map
                .entry(conn_id)
                .or_insert(HashSet::new());

            channel_set.insert(channel_id.unwrap());
        }
        // 创建进来
        channel_id
    }

    pub async fn get_all_conn_id(&self) -> Vec<i64> {
        let channel_conn_id_to_id_map = self.channel_conn_id_to_id_map.write().await;

        let keys: Vec<&i64> = channel_conn_id_to_id_map.keys().collect();

        let keys: Vec<i64> = keys.iter().map(|&x| *x).collect();
        keys
    }
}
